83d571bfcda45c9f895d8d37c02ed8713c8d9610,src/freenet/node/CHKInsertSender.java,CHKInsertSender,realRun,#,335
Before Change
int highHTLFailureCount = 0;
boolean starting = true;
while(true) {
synchronized(backgroundTransfers) {
if(receiveFailed) {
return; // don't need to set status as killed by CHKInsertHandler
}
}
/*
* If we haven't routed to any node yet, decrement according to the source.
* If we have, decrement according to the node which just failed.
* Because:
* 1) If we always decrement according to source then we can be at max or min HTL
* for a long time while we visit *every* peer node. This is BAD!
* 2) The node which just failed can be seen as the requestor for our purposes.
*/
// Decrement at this point so we can DNF immediately on reaching HTL 0.
boolean canWriteStorePrev = node.canWriteDatastoreInsert(htl);
if((!starting) && (!canWriteStorePrev)) {
// We always decrement on starting a sender.
// However, after that, if our HTL is above the no-cache threshold,
// we do not want to decrement the HTL for trivial rejections (e.g. RejectedLoop),
// because we would end up caching data too close to the originator.
// So allow 5 failures and then RNF.
if(highHTLFailureCount++ >= MAX_HIGH_HTL_FAILURES) {
if(logMINOR) Logger.minor(this, "Too many failures at non-cacheable HTL");
finish(ROUTE_NOT_FOUND, null);
return;
}
if(logMINOR) Logger.minor(this, "Allowing failure "+highHTLFailureCount+" htl is still "+htl);
} else {
htl = node.decrementHTL(sentRequest ? next : source, htl);
if(logMINOR) Logger.minor(this, "Decremented HTL to "+htl);
}
starting = false;
synchronized (this) {
if(htl == 0) {
// Send an InsertReply back
if(!sentRequest)
origTag.setNotRoutedOnwards();
finish(SUCCESS, null);
return;
}
}
if( node.canWriteDatastoreInsert(htl) && (!canWriteStorePrev) && forkOnCacheable) {
// FORK! We are now cacheable, and it is quite possible that we have already gone over the ideal sink nodes,
// in which case if we don't fork we will miss them, and greatly reduce the insert's reachability.
// So we fork: Create a new UID so we can go over the previous hops again if they happen to be good places to store the data.
// Existing transfers will keep their existing UIDs, since they copied the UID in the constructor.
uid = node.clientCore.makeUID();
forkedRequestTag = new InsertTag(false, InsertTag.START.REMOTE, source, realTimeFlag, uid, node);
forkedRequestTag.reassignToSelf();
forkedRequestTag.startedSender();
forkedRequestTag.unlockHandler();
Logger.normal(this, "FORKING CHK INSERT "+origUID+" to "+uid);
nodesRoutedTo.clear();
node.lockUID(uid, false, true, false, false, realTimeFlag, forkedRequestTag);
}
// Route it
// Can backtrack, so only route to nodes closer than we are to target.
next = node.peers.closerPeer(forkedRequestTag == null ? source : null, nodesRoutedTo, target, true, node.isAdvancedModeEnabled(), -1, null,
null, htl, ignoreLowBackoff ? Node.LOW_BACKOFF : 0, source == null);
if(next == null) {
// Backtrack
if(!sentRequest)
origTag.setNotRoutedOnwards();
finish(ROUTE_NOT_FOUND, null);
return;
}
if(logMINOR) Logger.minor(this, "Routing insert to "+next);
nodesRoutedTo.add(next);
Message req;
req = DMT.createFNPInsertRequest(uid, htl, myKey);
if(forkOnCacheable != Node.FORK_ON_CACHEABLE_DEFAULT) {
req.addSubMessage(DMT.createFNPSubInsertForkControl(forkOnCacheable));
}
if(ignoreLowBackoff != Node.IGNORE_LOW_BACKOFF_DEFAULT) {
req.addSubMessage(DMT.createFNPSubInsertIgnoreLowBackoff(ignoreLowBackoff));
}
if(preferInsert != Node.PREFER_INSERT_DEFAULT) {
req.addSubMessage(DMT.createFNPSubInsertPreferInsert(preferInsert));
}
req.addSubMessage(DMT.createFNPRealTimeFlag(realTimeFlag));
InsertTag thisTag = forkedRequestTag;
if(forkedRequestTag == null) thisTag = origTag;
thisTag.addRoutedTo(next, false);
// Send to next node
try {
/*
When using sendSync(), this send can often timeout (it is the first request we are sending to this node).
-If sendSync blocks here (message queue is full, node down, etc.) it can take up to 10 minutes,
if this occurs at even two nodes in any given insert (at any point in the path), the entire insert chain
will fatally timeout.
-We cannot be informed if sendSync() does timeout. A message will be logged, but this thread will simply continue
to the waitFor() and spend another timeout period there.
-The timeout on the waitFor() is 10 seconds (ACCEPTED_TIMEOUT).
-The interesting case is when this next node is temporarily busy, in which case we might skip a busy node if they
don't respond in ten seconds (ACCEPTED_TIMEOUT). Or, if the length of the send queue to them is greater than
ACCEPTED_TIMEOUT, using sendAsync() will skip them before they get the request. This would be a need for retuning
ACCEPTED_TIMEOUT.
*/
next.sendAsync(req, null, this);
} catch (NotConnectedException e1) {
if(logMINOR) Logger.minor(this, "Not connected to "+next);
thisTag.removeRoutingTo(next);
continue;
}
synchronized (this) {
sentRequest = true;
}
boolean failed;
synchronized(backgroundTransfers) {
failed = receiveFailed;
}
if(failed) {
thisTag.removeRoutingTo(next);
return; // don't need to set status as killed by CHKInsertHandler
}
Message msg = null;
if(!waitAccepted(next, thisTag)) {
thisTag.removeRoutingTo(next);
synchronized(backgroundTransfers) {
failed = receiveFailed;
}
if(failed) {
return; // don't need to set status as killed by CHKInsertHandler
}
continue; // Try another node
}
if(logMINOR) Logger.minor(this, "Got Accepted on "+this);
// Send them the data.
// Which might be the new data resulting from a collision...
Message dataInsert;
dataInsert = DMT.createFNPDataInsert(uid, headers);
/** What are we waiting for now??:
* - FNPRouteNotFound - couldn't exhaust HTL, but send us the
* data anyway please
* - FNPInsertReply - used up all HTL, yay
* - FNPRejectOverload - propagating an overload error :(
* - FNPRejectTimeout - we took too long to send the DataInsert
* - FNPDataInsertRejected - the insert was invalid
*/
MessageFilter mfInsertReply = MessageFilter.create().setSource(next).setField(DMT.UID, uid).setTimeout(searchTimeout).setType(DMT.FNPInsertReply);
MessageFilter mfRejectedOverload = MessageFilter.create().setSource(next).setField(DMT.UID, uid).setTimeout(searchTimeout).setType(DMT.FNPRejectedOverload);
MessageFilter mfRouteNotFound = MessageFilter.create().setSource(next).setField(DMT.UID, uid).setTimeout(searchTimeout).setType(DMT.FNPRouteNotFound);
MessageFilter mfDataInsertRejected = MessageFilter.create().setSource(next).setField(DMT.UID, uid).setTimeout(searchTimeout).setType(DMT.FNPDataInsertRejected);
MessageFilter mfTimeout = MessageFilter.create().setSource(next).setField(DMT.UID, uid).setTimeout(searchTimeout).setType(DMT.FNPRejectedTimeout);
MessageFilter mf = mfInsertReply.or(mfRouteNotFound.or(mfDataInsertRejected.or(mfTimeout.or(mfRejectedOverload))));
if(logMINOR) Logger.minor(this, "Sending DataInsert");
synchronized(backgroundTransfers) {
failed = receiveFailed;
}
if(failed) {
thisTag.removeRoutingTo(next);
return; // don't need to set status as killed by CHKInsertHandler
}
try {
next.sendSync(dataInsert, this);
} catch (NotConnectedException e1) {
if(logMINOR) Logger.minor(this, "Not connected sending DataInsert: "+next+" for "+uid);
thisTag.removeRoutingTo(next);
continue;
}
if(logMINOR) Logger.minor(this, "Sending data");
startBackgroundTransfer(next, prb);
while (true) {
synchronized(backgroundTransfers) {
failed = receiveFailed;
}
if(failed) {
thisTag.removeRoutingTo(next);
return; // don't need to set status as killed by CHKInsertHandler
}
try {
msg = node.usm.waitFor(mf, this);
} catch (DisconnectedException e) {
Logger.normal(this, "Disconnected from " + next
+ " while waiting for InsertReply on " + this);
thisTag.removeRoutingTo(next);
break;
}
synchronized(backgroundTransfers) {
failed = receiveFailed;
}
if(failed) {
thisTag.removeRoutingTo(next);
return; // don't need to set status as killed by CHKInsertHandler
}
if (msg == null) {
Logger.error(this, "Timeout on insert "+this+" to "+next);
// First timeout.
// Could be caused by the next node, or could be caused downstream.
next.localRejectedOverload("AfterInsertAcceptedTimeout2");
forwardRejectedOverload();
synchronized(this) {
status = TIMED_OUT;
notifyAll();
}
// Wait for second timeout.
while (true) {
synchronized(backgroundTransfers) {
failed = receiveFailed;
}
if(failed) {
thisTag.removeRoutingTo(next);
return; // don't need to set status as killed by CHKInsertHandler
}
try {
After Change
}
req.addSubMessage(DMT.createFNPRealTimeFlag(realTimeFlag));
InsertTag thisTag = forkedRequestTag;
if(forkedRequestTag == null) thisTag = origTag;
thisTag.addRoutedTo(next, false);
// Send to next node
try {
/*
When using sendSync(), this send can often timeout (it is the first request we are sending to this node).
-If sendSync blocks here (message queue is full, node down, etc.) it can take up to 10 minutes,
if this occurs at even two nodes in any given insert (at any point in the path), the entire insert chain
will fatally timeout.
-We cannot be informed if sendSync() does timeout. A message will be logged, but this thread will simply continue
to the waitFor() and spend another timeout period there.
-The timeout on the waitFor() is 10 seconds (ACCEPTED_TIMEOUT).
-The interesting case is when this next node is temporarily busy, in which case we might skip a busy node if they
don't respond in ten seconds (ACCEPTED_TIMEOUT). Or, if the length of the send queue to them is greater than
ACCEPTED_TIMEOUT, using sendAsync() will skip them before they get the request. This would be a need for retuning
ACCEPTED_TIMEOUT.
*/
next.sendAsync(req, null, this);
} catch (NotConnectedException e1) {
if(logMINOR) Logger.minor(this, "Not connected to "+next);
thisTag.removeRoutingTo(next);
continue;
}
synchronized (this) {
sentRequest = true;
}
if(failIfReceiveFailed(thisTag, next)) return;
Message msg = null;
if(!waitAccepted(next, thisTag)) {
thisTag.removeRoutingTo(next);
if(failIfReceiveFailed(thisTag, next)) return;
continue; // Try another node
}
if(logMINOR) Logger.minor(this, "Got Accepted on "+this);
// Send them the data.
// Which might be the new data resulting from a collision...
Message dataInsert;
dataInsert = DMT.createFNPDataInsert(uid, headers);
/** What are we waiting for now??:
* - FNPRouteNotFound - couldn't exhaust HTL, but send us the
* data anyway please
* - FNPInsertReply - used up all HTL, yay
* - FNPRejectOverload - propagating an overload error :(
* - FNPRejectTimeout - we took too long to send the DataInsert
* - FNPDataInsertRejected - the insert was invalid
*/
MessageFilter mfInsertReply = MessageFilter.create().setSource(next).setField(DMT.UID, uid).setTimeout(searchTimeout).setType(DMT.FNPInsertReply);
MessageFilter mfRejectedOverload = MessageFilter.create().setSource(next).setField(DMT.UID, uid).setTimeout(searchTimeout).setType(DMT.FNPRejectedOverload);
MessageFilter mfRouteNotFound = MessageFilter.create().setSource(next).setField(DMT.UID, uid).setTimeout(searchTimeout).setType(DMT.FNPRouteNotFound);
MessageFilter mfDataInsertRejected = MessageFilter.create().setSource(next).setField(DMT.UID, uid).setTimeout(searchTimeout).setType(DMT.FNPDataInsertRejected);
MessageFilter mfTimeout = MessageFilter.create().setSource(next).setField(DMT.UID, uid).setTimeout(searchTimeout).setType(DMT.FNPRejectedTimeout);
MessageFilter mf = mfInsertReply.or(mfRouteNotFound.or(mfDataInsertRejected.or(mfTimeout.or(mfRejectedOverload))));
if(logMINOR) Logger.minor(this, "Sending DataInsert");
if(failIfReceiveFailed(thisTag, next)) return;
try {
next.sendSync(dataInsert, this);
} catch (NotConnectedException e1) {
if(logMINOR) Logger.minor(this, "Not connected sending DataInsert: "+next+" for "+uid);
thisTag.removeRoutingTo(next);
continue;
}
if(logMINOR) Logger.minor(this, "Sending data");
startBackgroundTransfer(next, prb);
while (true) {
if(failIfReceiveFailed(thisTag, next)) return;
try {
msg = node.usm.waitFor(mf, this);
} catch (DisconnectedException e) {
Logger.normal(this, "Disconnected from " + next
+ " while waiting for InsertReply on " + this);
thisTag.removeRoutingTo(next);
break;
}
if(failIfReceiveFailed(thisTag, next)) return;
if (msg == null) {
Logger.error(this, "Timeout on insert "+this+" to "+next);
// First timeout.
// Could be caused by the next node, or could be caused downstream.
next.localRejectedOverload("AfterInsertAcceptedTimeout2");
forwardRejectedOverload();
synchronized(this) {
status = TIMED_OUT;
notifyAll();
}
// Wait for second timeout.
while (true) {
if(failIfReceiveFailed(thisTag, next)) return;
try {
msg = node.usm.waitFor(mf, this);
} catch (DisconnectedException e) {